Spark MLlib এবং Spark SQL এর Integration গাইড ও নোট

Big Data and Analytics - স্পার্ক এসকিউএল (Spark SQL) - Machine Learning এবং Spark SQL Integration
453

Apache Spark একটি অত্যন্ত শক্তিশালী এবং স্কেলেবল ফ্রেমওয়ার্ক যা Spark SQL এবং Spark MLlib এর মাধ্যমে ডেটা প্রসেসিং এবং মেশিন লার্নিং উভয় ক্ষেত্রেই কার্যকরী সমাধান সরবরাহ করে। Spark SQL ডেটা বিশ্লেষণ এবং ট্রান্সফর্মেশনের জন্য ব্যবহৃত হয়, আর Spark MLlib হল একটি মেশিন লার্নিং লাইব্রেরি যা স্ট্রাকচারড ডেটার ওপর মডেল তৈরি, প্রশিক্ষণ এবং পূর্বাভাস করতে সহায়তা করে।

Spark MLlib এবং Spark SQL এর মধ্যে ইন্টিগ্রেশন ব্যবহার করে আপনি SQL কোয়ারি এবং মেশিন লার্নিং মডেল একই ডেটাসেটে প্রয়োগ করতে পারেন, যা ডেটা প্রস্তুতি থেকে শুরু করে মডেল তৈরির পর্যন্ত একসাথে কাজ করার সুবিধা দেয়।

এখানে আমরা আলোচনা করব কিভাবে Spark MLlib এবং Spark SQL একত্রে ব্যবহার করা যায়।


1. Spark SQL এবং Spark MLlib এর Integration

Spark SQL এবং MLlib এর মধ্যে ইন্টিগ্রেশন অনেক সুবিধা নিয়ে আসে, যেমন:

  • Structured Data থেকে মেশিন লার্নিং মডেল তৈরি করা।
  • SQL কোয়ারির মাধ্যমে ডেটা প্রস্তুতি (Data Preparation) এবং ট্রান্সফর্মেশন সহজ করা।
  • DataFrame/Dataset API-এর মাধ্যমে ডেটা ম্যানিপুলেশন এবং মডেল ট্রেনিং করা।

Spark SQL DataFrame API এর মাধ্যমে সহজেই SQL কোয়ারি চালিয়ে ডেটা প্রস্তুতি করতে পারেন এবং MLlib API দিয়ে সেই ডেটার ওপর মডেল তৈরি এবং প্রশিক্ষণ করতে পারেন।


2. Spark SQL-এ ডেটা প্রস্তুতি

Spark SQL-এর DataFrame API ব্যবহার করে ডেটা প্রস্তুতি করা খুবই সহজ। এই API-তে SQL কোয়ারি এবং ট্রান্সফর্মেশন ব্যবহার করে ডেটাকে মেশিন লার্নিং মডেল তৈরির জন্য প্রস্তুত করা হয়।

উদাহরণ: DataFrame-এ SQL কোয়ারি এবং ট্রান্সফর্মেশন প্রয়োগ

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# SparkSession তৈরি
spark = SparkSession.builder.appName("Spark SQL and MLlib Integration").getOrCreate()

# DataFrame তৈরি
data = [(0, 1.0, 1.1, 0.1), (1, 1.2, 1.3, 0.2), (0, 2.1, 2.2, 0.3), (1, 3.1, 3.2, 0.4)]
columns = ["label", "feature1", "feature2", "feature3"]
df = spark.createDataFrame(data, columns)

# SQL কোয়ারি প্রয়োগ
df.createOrReplaceTempView("data")
result_df = spark.sql("SELECT * FROM data WHERE feature1 > 1.0")

# Feature vector তৈরি (MLlib DataFrame format)
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
data_prepared = assembler.transform(result_df)

# মেশিন লার্নিং মডেল ট্রেনিং
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(data_prepared)

# মডেল ব্যবহার করে পূর্বাভাস করা
predictions = model.transform(data_prepared)
predictions.show()

এখানে:

  • SQL কোয়ারি ব্যবহার করে feature1 এর মান 1.0 এর বেশি এমন রেকর্ডগুলো ফিল্টার করা হয়েছে।
  • VectorAssembler ব্যবহার করে feature1, feature2, এবং feature3 কলামগুলোকে একত্রিত করে একটি ফিচার ভেক্টর তৈরি করা হয়েছে।
  • তারপর সেই ফিচার ভেক্টর ব্যবহার করে Logistic Regression মডেল তৈরি এবং পূর্বাভাস করা হয়েছে।

3. MLlib মডেল ট্রেনিং এবং SQL ব্যবহার

Spark MLlib-এর মাধ্যমে তৈরি করা মডেলগুলোকে SQL কোয়ারি এবং DataFrame API ব্যবহার করে ডেটার সাথে ইন্টিগ্রেট করা যায়। Spark SQL ব্যবহার করে আপনি মডেল ট্রেনিংয়ের জন্য ডেটা প্রস্তুত করতে পারেন এবং মডেল তৈরির পর SQL কোয়ারি দিয়ে পূর্বাভাস বা প্রেডিকশন করতে পারেন।

উদাহরণ: SQL কোয়ারি দ্বারা Prediction

# Model predictions এ SQL কোয়ারি ব্যবহার
predictions.createOrReplaceTempView("predictions")
predicted_result = spark.sql("SELECT features, prediction FROM predictions WHERE prediction = 1.0")

# Prediction দেখানো
predicted_result.show()

এখানে, পূর্বাভাসের ফলাফল একটি SQL টেবিল হিসেবে রেজিস্টার করা হয়েছে এবং SQL কোয়ারি ব্যবহার করে prediction কলামের মান 1.0 এমন রেকর্ডগুলো ফিল্টার করা হয়েছে।


4. SQL থেকে DataFrame এবং UDF ব্যবহার

Spark SQL-এ User Defined Functions (UDFs) ব্যবহার করে কাস্টম ট্রান্সফর্মেশন এবং মডেল তৈরি করা যেতে পারে। UDF ব্যবহার করে মেশিন লার্নিং মডেলের উপরে কাস্টম লজিক প্রয়োগ করা যেতে পারে, যেমন ডেটা প্রিপ্রসেসিং বা অ্যাগ্রিগেশন।

উদাহরণ: UDF ব্যবহার

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF তৈরি: কাস্টম স্কোরিং ফাংশন
def custom_score(features):
    return float(features[0]) + float(features[1]) + float(features[2])

# UDF রেজিস্টার করা
custom_score_udf = udf(custom_score, DoubleType())

# UDF ব্যবহার করে নতুন কলাম তৈরি
result_df = data_prepared.withColumn("custom_score", custom_score_udf(data_prepared["features"]))

result_df.show()

এখানে:

  • একটি কাস্টম স্কোরিং ফাংশন (UDF) তৈরি করা হয়েছে, যা features কলাম থেকে মান নিয়ে একটি কাস্টম স্কোর গণনা করবে।
  • তারপর সেই UDF ব্যবহার করে নতুন custom_score কলাম তৈরি করা হয়েছে।

5. Batch এবং Streaming Data Integration with MLlib

Spark SQL-এ Batch এবং Streaming ডেটা উভয়ের সঙ্গে MLlib মডেল ব্যবহার করা যেতে পারে। Structured Streaming API ব্যবহার করে স্ট্রিমিং ডেটার ওপর মডেল ট্রেনিং বা পূর্বাভাস করা যেতে পারে। স্ট্রিমিং ডেটার জন্য মডেলটি ধারাবাহিকভাবে আপডেট করা যেতে পারে।

উদাহরণ: Batch এবং Streaming Data এর জন্য Integration

# Streaming Data লোড করা
streaming_df = spark.readStream.format("json").load("path/to/streaming_data")

# Batch Data লোড করা
batch_df = spark.read.parquet("path/to/batch_data")

# Batch Data এবং Streaming Data কে একত্রিত করা
joined_df = batch_df.join(streaming_df, "id")

# Mllib মডেল ব্যবহার
predictions = model.transform(joined_df)

# Write results
query = predictions.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

এখানে:

  • Batch এবং Streaming Data একত্রিত করা হচ্ছে এবং MLlib মডেল প্রয়োগ করা হচ্ছে।

সারাংশ

Spark SQL এবং Spark MLlib এর মধ্যে ইন্টিগ্রেশন দ্বারা, আপনি SQL কোয়ারি এবং মেশিন লার্নিং মডেল একই ডেটাসেটে প্রয়োগ করতে পারেন। Hive Functions এবং UDFs ব্যবহার করে কাস্টম লজিক এবং ট্রান্সফর্মেশন করা যায়, এবং Structured Streaming এর মাধ্যমে Batch এবং Streaming ডেটা একত্রিত করা সম্ভব হয়। Spark SQL-এর সাথে Spark MLlib-এর সমন্বয়ে আপনি ডেটা বিশ্লেষণ এবং মডেল ট্রেনিংয়ের জন্য একটি শক্তিশালী সমাধান তৈরি করতে পারেন, যা ডেটার সম্পূর্ণ প্রক্রিয়াকে সিমলেস এবং স্কেলেবল করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...